import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo5MapReduceReadHBase {
    // 读取HBase中的students表，统计每个班级的人数
    public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String rowkey = Bytes.toString(key.get());
            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));

            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable i : values) {
                sum += i.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,node1,node2:2181");

        Job job = Job.getInstance(conf);
        job.setJobName(Demo5MapReduceReadHBase.class.getName());
        job.setJarByClass(Demo5MapReduceReadHBase.class);

        // 配置map任务
        // 使用HBase提供的TableMapReduceUtil工具进行配置
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("students")
                , new Scan()
                , ReadHBaseMapper.class
                , Text.class
                , IntWritable.class
                , job
        );


        // 配置reduce任务
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        FileOutputFormat.setOutputPath(job, new Path("/data/hbase/clazz_num"));

        job.waitForCompletion(true);
        /*
        hadoop jar HBaseJavaAPI11-1.0.jar Demo5MapReduceReadHBase
         */
    }

}
